Skip to content

chore: refactor concurrency model in the bgitable session protocol #13485

Draft
igorbernstein2 wants to merge 25 commits into
googleapis:mainfrom
igorbernstein2:threading-refactor-v2
Draft

chore: refactor concurrency model in the bgitable session protocol #13485
igorbernstein2 wants to merge 25 commits into
googleapis:mainfrom
igorbernstein2:threading-refactor-v2

Conversation

@igorbernstein2

Copy link
Copy Markdown
Contributor

No description provided.

Document lock ownership before later steps remove the lock. Add @GuardedBy
to fields under the lock, move heartbeatInterval read into the synchronized
block in startRpc(), and comment fields intentionally read outside the lock.
No runtime change.
Replace ScheduledExecutorService with a BigtableTimer (Netty hashed wheel,
will move in-tree later) for heartbeat, deadline, watchdog, AFE-prune,
retry-create-session, and retry-delay. Owned by Client and shared across
pools.
Replaces CancellableVRpc with a VOperation layer that sits above the VRpc
chain rather than inside it. VOperationImpl owns the gRPC Context
cancellation listener and constructs the per-op VRpcCallContext; downstream
middleware just sees the chain.
VRpcCallContext.getExecutor() returns OpExecutor (thin wrapper with
runningThread affinity tracking). VOperationImpl constructs the per-call
SynchronizationContext + OpExecutor; RetryingVRpc drops its own SyncContext
and dispatches via ctx.getExecutor(). The uncaught-handler safety net moves
from RetryingVRpc up to VOperationImpl.
VRpcImpl.handle*() methods now dispatch listener callbacks via
ctx.getExecutor(), with CAS STARTED->CLOSED in all three (handleError no
longer proceeds from NEW) and decode moved into the executor task.
RetryingVRpc.Active drops its own wrap since callbacks already arrive on
the op executor. start() publishes ctx/listener only after winning the CAS
so a racing duplicate can't corrupt the winner's fields. SessionPoolImpl's
three direct listener.onClose paths also dispatch via ctx.getExecutor().

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the threading and execution model of the Bigtable client by introducing a per-operation serializing executor (OpExecutor), a unified operation wrapper (VOperationImpl), and a hashed-wheel timer (BigtableTimer/NettyWheelTimer) to handle scheduling tasks like heartbeats and deadlines without stalling Netty I/O threads. Additionally, SessionImpl and SessionPoolImpl are refactored to use SynchronizationContext instead of synchronized blocks. The review feedback highlights critical thread-safety and robustness issues, including potential visibility bugs from removing volatile on exposed session parameters, a potential NullPointerException in PendingVRpc.cancel when ctx is null, and the need to guard PendingVRpc.start against already-cancelled operations.

Comment on lines +109 to +111
private OpenParams openParams;

private volatile boolean openParamsUpdated;
private boolean openParamsUpdated;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Removing the volatile keyword from openParams and openParamsUpdated introduces a Java Memory Model (JMM) visibility and data-race issue. Even if there are no off-context production readers today, these fields are exposed via public getters (such as getOpenParams() and isOpenParamsUpdated()). Any external thread (like SessionPoolImpl or background threads) calling these getters directly without executing on the sessionSyncContext could read stale values or fail to see updates entirely. Keeping them volatile is crucial for thread safety and defensive programming.

Suggested change
private OpenParams openParams;
private volatile boolean openParamsUpdated;
private boolean openParamsUpdated;
private volatile OpenParams openParams;
private volatile boolean openParamsUpdated;

Comment on lines 739 to +743
private void cancel(Status status, boolean onlyCancelPendingCall) {
boolean delegateToRealCall = true;
synchronized (SessionPoolImpl.this) {
if (isCancelled) {
return;
}
pendingRpcs.remove(this); // eager removal; no-op if already drained
}
ctx.getExecutor().execute(() -> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If cancel is called on a PendingVRpc before start() has been invoked, ctx will be null, leading to a NullPointerException when calling ctx.getExecutor(). To prevent this, we should check if ctx is null under the SessionPoolImpl.this monitor lock, set isCancelled = true, and return early. Note: While we use synchronized here to match the existing locking strategy of SessionPoolImpl, in performance-sensitive code, explicit locks are preferred over the synchronized keyword to protect shared state.

    private void cancel(Status status, boolean onlyCancelPendingCall) {
      synchronized (SessionPoolImpl.this) {
        pendingRpcs.remove(this); // eager removal; no-op if already drained
        if (ctx == null) {
          isCancelled = true;
          return;
        }
      }
      ctx.getExecutor().execute(() -> {
References
  1. In performance-sensitive code, prefer using explicit locks over the 'synchronized' keyword to protect shared state while ensuring thread safety and visibility.

Comment on lines 693 to 701
synchronized (SessionPoolImpl.this) {
if (SessionPoolImpl.this.poolState != PoolState.STARTED) {
listener.onClose(
VRpcResult result =
VRpcResult.createUncommitedError(
Status.UNAVAILABLE.withCause(
new IllegalStateException("SessionPool is closed"))));
new IllegalStateException("SessionPool is closed")));
ctx.getExecutor().execute(() -> listener.onClose(result));
return;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To complement the pre-start cancellation guard, PendingVRpc.start should check if isCancelled is already true when called, and if so, fast-fail immediately instead of queueing the RPC in pendingRpcs. Note: While we use synchronized here to match the existing locking strategy of SessionPoolImpl, in performance-sensitive code, explicit locks are preferred over the synchronized keyword to protect shared state.

Suggested change
synchronized (SessionPoolImpl.this) {
if (SessionPoolImpl.this.poolState != PoolState.STARTED) {
listener.onClose(
VRpcResult result =
VRpcResult.createUncommitedError(
Status.UNAVAILABLE.withCause(
new IllegalStateException("SessionPool is closed"))));
new IllegalStateException("SessionPool is closed")));
ctx.getExecutor().execute(() -> listener.onClose(result));
return;
}
synchronized (SessionPoolImpl.this) {
if (SessionPoolImpl.this.poolState != PoolState.STARTED) {
VRpcResult result =
VRpcResult.createUncommitedError(
Status.UNAVAILABLE.withCause(
new IllegalStateException("SessionPool is closed")));
ctx.getExecutor().execute(() -> listener.onClose(result));
return;
}
if (isCancelled) {
VRpcResult result = VRpcResult.createRejectedError(Status.CANCELLED);
ctx.getExecutor().execute(() -> listener.onClose(result));
return;
}
References
  1. In performance-sensitive code, prefer using explicit locks over the 'synchronized' keyword to protect shared state while ensuring thread safety and visibility.

The pool-wide heartbeat monitor on main only inspected sessions in
inUseSessions (i.e. with an active vRPC). When heartbeat scheduling
moved into SessionImpl, the per-session timer was instead armed once on
ready and self-rescheduled indefinitely. The "no active vRPC" case
was reduced to a sentinel (nextHeartbeat = now + FUTURE_TIME), which
made checkHeartbeat no-op but kept the wheel ticking. It also left a
latent force-close on idle sessions: handleHeartBeatResponse
unconditionally resets nextHeartbeat to now + heartbeatInterval, so an
idle session that received a server heartbeat became eligible for
force-close on the next missed beat.

Tie the timer to the vRPC lifecycle to match main's semantic:

- handleOpenSessionResponse no longer schedules the tick.
- startRpc arms the tick after setting nextHeartbeat.
- handleVRpcResponse / handleVRpcErrorResponse cancel the tick.
- updateState (already cancels on transition past READY).

Add SessionImplTest coverage with a counting BigtableTimer wrapper that
asserts (a) no schedule before any vRPC and (b) exactly one
schedule/cancel per vRPC lifecycle.
SessionImpl gains sessionSyncContext that serializes stream callbacks.
onMessage/onClose dispatch onto it, and the per-session heartbeat tick
trampolines through it. synchronized(lock) blocks remain inside the
handlers — the two coexist for now. Affinity asserts added at boundary
methods and every handle*.
SessionImpl.startRpc and cancelRpc now submit to sessionSyncContext rather
than running synchronously on the caller. VRpcSessionApi.startRpc is void
— errors flow through rpc.handleError() onto ctx.getExecutor(). VRpcImpl
drops its synchronous post-startRpc error branch.
All session state mutations now run on sessionSyncContext, so the per-session
Object lock is no longer needed. Public methods (start, close, forceClose,
startRpc, cancelRpc) submit onto sessionSyncContext; nextRpcId becomes
AtomicLong for the cross-thread newCall() caller. handleVRpcResponse and
handleVRpcErrorResponse drop the localCancel/localRpc capture-and-recheck
dance — sessionSyncContext serializes them now. Stale lock-era comments
on the fields are replaced with a sessionSyncContext-ownership note.
SessionImplTest.testHeartbeat polls for the now-async nextHeartbeat update.
Split terminal close into notifyTerminalClose (per-target try/catch fan-out)
and abortFromUncaughtException (global handler). Uncaught syncContext
exceptions always set closeReason to ERROR — the prior reason is folded into
the description so tracer/metrics correctly attribute aborts.

notifyTerminalClose synthesizes a fallback closeReason if missing: every
caller sets it today (forceClose, startGracefulClose, dispatchStreamClosed,
abortFromUncaughtException), but a future writer who forgets would NPE
inside the fan-out — and the throw escapes to the syncContext uncaught
handler, which early-returns on the already-CLOSED state and silently skips
the remaining cleanup. The synthesizer mirrors startGracefulClose: log a
warning with an IllegalStateException for stack-trace observability, then
build a fallback CloseSessionRequest so the rest of the fan-out runs.

Adds three regression tests (listener.onReady throws, onClose throws,
both throw).
Client (and ShimImpl) own a dedicated bigtable-callback-%d cached pool,
plumbed through *Async / TableBase. A blocked user callback can no longer
starve heartbeats, retry delays, or pool bookkeeping (all of which run on
backgroundExecutor). The op-level SerializingExecutor in a later commit
will dispatch onto this pool.
VOperationImpl now constructs OpExecutor over a per-call SequentialExecutor
on the shared userCallbackExecutor, replacing the per-call
SynchronizationContext. OpExecutor gains an UncaughtExceptionHandler ctor
arg — the safety net that the removed RetryingVRpc-owned SyncContext
provided. The 3-arg VRpcCallContext.create defaults to a no-op handler for
tests; production callers go through VOperationImpl.
CallOptions.withExecutor(MoreExecutors.directExecutor()) on the session
stream so Netty I/O threads deliver SessionStream.Listener callbacks
directly; sessionSyncContext immediately trampolines off them.
Tests that call Future.get() on vRpc chains can hang indefinitely if an
exception breaks the callback dispatch chain and orphans the future (see
ISSUE-006). A class-level JUnit 5 @timeout(30) converts a silent hang into
a clear test failure within a bounded time.

Applied to: RetryingVRpcTest, VRpcTracerTest, ClientTest, TableBaseTest,
SessionImplTest, SessionPoolImplTest.
PendingVRpc.cancel and drainTo move isCancelled/realCall onto
ctx.getExecutor(); the pool lock now covers only queue / poolState /
session list. close() switches to cancelWithResult to honor the new
op-executor contract. NOOP_CALL sentinel removed.

PendingVRpc.start arms the deadline monitor only AFTER committing to the
queue (inside the synchronized block). Previously the timer was scheduled
before the pool-state check, so the closed-pool fast-fail path returned
without cancelling it — the timer then fired later and called
listener.onClose a second time with DEADLINE_EXCEEDED. RetryingVRpc.Active
suppressed the duplicate at the user-facing layer, but
tracer.onAttemptFinish still ran twice and corrupted per-attempt metrics.

Adds SessionPoolImplTest#pendingVRpcOnClosedPoolDoesNotLeakDeadlineMonitor.
VOperationImpl captures opExecutor in start() and trampolines start/cancel
via it. RetryingVRpc.start runs synchronously on the op-executor task
RetryingVRpc.cancel no longer wraps in execute. Tracer.onOperationStart
reordered before started=true (a throwing tracer short-circuits to direct
listener.onClose). listener.onMessage failures classify as USER_FAILURE.
CleanupListener tracks a closed flag to prevent gRPC-context listener
leaks on synchronous chain close.
…en Client.close

OpExecutor switches from a SequentialExecutor backing to an internal
ArrayDeque + drain loop, and gains runInline() — runs the task synchronously
on the caller thread when the executor is idle, otherwise queues it.
VOperationImpl uses runInline for chain.start so the start dispatch skips
the queue+drain round-trip. Drain rejections (e.g. during shutdown) reset
drainScheduled so subsequent submissions can retry.

Client.close drains userCallbackExecutor first via a 5s-bounded
shutdownAndAwait so pool.close's cancelWithResult onClose notifications
complete before the executor is torn down. Resource.close becomes idempotent.

Drive-by cleanups: replace fully-qualified type names with imports across
touched files, and swap Guava Objects.hashCode(id) for Long.hashCode(id) in
ChannelPoolDpImpl.AfeId to avoid per-call boxing and array allocation.
…lient.close

Client.close shut down userCallbackExecutor before draining the SessionPools
that depend on it, so late listener.onClose tasks from in-flight RPCs
arrived after backing was dead and got RejectedExecutionException — silently
stranding the user's terminal callbacks. The earlier fix sprinkled
inline-drain fallbacks inside OpExecutor; restructure shutdown instead so
the race can't happen.

SessionPool gains awaitTerminated(Duration), backed by a CompletableFuture
SessionPoolImpl completes from onSessionClose once the pool is CLOSED and
the last session has drained. close() no longer kills the watchdog —
awaitTerminated takes ownership of that, so the watchdog stays alive during
shutdown and can escalate any session stuck in WAIT_SERVER_CLOSE longer
than its tick interval (5 min) via forceClose.

Client.close becomes three explicit phases: (1) initiate graceful close on
each pool, (2) awaitTerminated on each with a 6-minute per-pool budget
(one full watchdog tick plus buffer), (3) tear down userCallbackExecutor /
channelPool / timers in the existing order, now safely because all
listener.onClose tasks are queued or drained before backing dies.

VOperationImpl.start queues grpcContext.addListener through the op executor
via runInline. Without this, an async-queued onClose from chain.start
(PendingVRpc pool-closed fast-fail, VRpcImpl deadline-exceeded short-circuit)
could drain between the CleanupListener.closed read and addListener:
CleanupListener.onClose would call removeListener as a no-op pre-registration,
and the caller would then register a listener with nothing to remove it.
The leak is per-RPC and permanent until grpcContext cancels — for long-lived
application contexts it accumulates indefinitely. FIFO ordering through the
op executor makes the closed-check sound: any onClose chain.start enqueued
drains first, so the check is accurate by the time we evaluate it.

Add a `closed` AtomicBoolean + checkNotClosed() guard on the three openers
so concurrent opens during shutdown can't create pools the close path
won't see. close() is now idempotent via CAS on that flag.

Tests:
  - ClientTest#openAfterCloseThrows / closeIsIdempotent
  - SessionPoolImplTest awaitTerminated* coverage
  - VOperationImplTest covering async onClose / context cancel ordering
  - SessionPoolImplTest tearDown now calls awaitTerminated so the watchdog
    is closed before testTimer.stop races its self-reschedule
  - FakeSessionPool in TableBaseTest gains a no-op awaitTerminated stub

Drive-by: remove the spurious @nested annotation from SessionPoolImplTest's
top-level class. @nested is meaningful only on non-static inner classes;
on the outer class it caused Surefire to mis-attribute test counts.
…close

Scheduled RetryingVRpcs hold no session reference, so a long-delay
retry (server-driven RetryInfo.retryDelay) outlives Phase 2 drain and
is silently discarded when sessionTimer.stop() runs in Phase 3. The
user's listener never fires.

Add an onStop hook primitive to BigtableTimer. Scheduled.onStart
registers a hook on entry and unregisters on every exit path (normal
fire, cancel, hook fire). NettyWheelTimer.stop() runs every hook
synchronously before discarding pending wheel timeouts; hooks
trampoline back through the op executor to drive Scheduled to a
CANCELLED Done.

Reorder Client.close Phase 3 so sessionTimer.stop() runs before
userCallbackExecutor.close(), giving the hook-fired onClose tasks a
live op-executor backing to land on.

Also replace Scheduled.onStart's dead RejectedExecutionException catch
with IllegalStateException, matching BigtableTimer.stop()'s documented
post-condition.
openTableAsync / openAuthorizedViewAsync / openMaterializedViewAsync
read 'closed' lock-free, then constructed the pool, then inserted it
into sessionPools — close() could CAS closed=true and snapshot
sessionPools in between, leaving the new pool orphaned: never closed,
its callbacks landing on shut-down executors.

Hold sessionPools' monitor across the closed check, the construction,
and the insert. Opens are infrequent (typically once per table at app
startup) so the monitor cost is negligible.

Move close()'s closed flip inside the same monitor too. With every
access now under the lock, downgrade 'closed' from AtomicBoolean to a
plain boolean — the CAS provided no value over a plain read+write
under the lock.
ShimImpl registered its userCallbackExecutor with a bare shutdown() closer,
while Client.create's path uses shutdownAndAwait (which gives in-flight
listener.onClose tasks a 5-second drain window before shutdownNow). On
ShimImpl close, queued callbacks were abandoned mid-flight — fine for
quiescent shutdowns but a regression for fast-close patterns (test boundaries,
dynamic config reloads) where in-flight callbacks have not yet drained.

Promote Client.shutdownAndAwait from private-static to public-static so
ShimImpl (different package) can reuse the same shutdown semantics, and
update ShimImpl to call it.
ReadRowShim and MutateRowShim cache per-target handles in a Guava
LoadingCache whose loader calls Client.openTableAsync (and friends).
After the Client-close hardening, that loader throws
IllegalStateException post-close — and getUnchecked wraps it in
UncheckedExecutionException, so callers see an unchecked exception
thrown out of readRow / mutateRow instead of a failed CompletableFuture
as the surface contract promises.

Introduce SessionPoolMap, a small wrapper over the existing
Util.createSessionMap cache that owns the conversion: get() unwraps
UncheckedExecutionException to surface the original cause, and apply()
converts a loader throw into a failed CompletableFuture for the async
call paths.

Replace the inline LoadingCache fields in both shim ops files.
Covered by a new focused unit test.
… Active

State machine now has an onExit hook called from onStateChange before the
swap. States that hold cleanup-worthy resources override it.

Active owns its own tracer-pairing flag + finishAttempt helper (no longer
a RetryingVRpc-level bool). Listener paths and onCancel call finishAttempt
with the right result; onExit is a safety net that guarantees the pairing
is balanced even if a future exit path forgets. Fixes the
attempt.start synchronous-throw leak.

Also resolves two latent tracer hazards:
- tracer.onAttemptFinish now fires when an in-flight attempt is cancelled
  (previously the late server onClose was dropped by the stale-state guard,
  so the cancelled attempt's tracer span leaked).
- The listener-path tracer.onAttemptFinish is gated on the stale-state
  check first, matching onMessage above — a discarded onClose can no
  longer double-fire the tracer.

Scheduled cleanup (timer + stop-hook unregister) consolidates from three
parallel sites (onCancel, timer-fire body, stop-hook body) into one onExit.
onCancel drops to the default no-op.

Done.onStart no longer balances per-attempt tracer state — that lives on
Active now.
VOperationImpl.start needs to detect whether chain.start synchronously
delivered a terminal onClose, so it can skip registering the gRPC
cancellation listener (which would otherwise leak onto grpcContext).
Previously this was tracked via a 'closed' flag on CleanupListener — a
piggyback bookkeeping field on the listener wrapper that exists only
for VOperationImpl's coordination.

Add isDone() to the VRpc interface and ask the chain directly. The
chain is the natural source of truth for its own terminal state.
CleanupListener shrinks back to its single concern: relay events and
unhook the gRPC cancellation listener on close.

Implementations: RetryingVRpc delegates to currentState.isDone();
VRpcImpl reports state==CLOSED; ForwardingVRpc forwards; PendingVRpc
defers to realCall once handed off, otherwise reports isCancelled.
Test fakes (DelayedVRpc, FakeVRpc, anonymous VOperationImplTest chains)
implement the new method.

Drive-by: drop the defensive handling of tracer.onOperationStart throws
from RetryingVRpc.start, and the symmetric `!started` early-return in
Done.onStart that paired with it. CompositeVRpcTracer catches throws
from every child tracer, so the only way tracer.onOperationStart reaches
RetryingVRpc with a real throw is a test that bypasses Composite. Dead
code in production; relying on the existing chain.cancel cascade is
simpler than maintaining a separate short-circuit path.

Also: the already-started error in RetryingVRpc.start now dispatches
listener.onClose through ctx.getExecutor() rather than invoking it
synchronously on the caller, matching the dispatch convention used
everywhere else in the chain.
Inline doc: clarify why OpExecutor is not SynchronizationContext. They
look superficially similar (FIFO + drain + affinity assertion + uncaught
handler) but have opposite drain-thread policies. SynchronizationContext
drains on whichever thread first calls execute() while idle — appropriate
for state serialization on threads that should do that work anyway.
OpExecutor always hands off to the backing user-callback pool so chain
callbacks never run on transport / session-sync / timer-dispatch threads.

TODO entries for deferred review findings:
- closeReason synthesizer triplication in SessionImpl
- drainedFuture completed from two unrelated sites
- shutdownAndAwait as public-static helper in the wrong place
- RetryingVRpc relies on unenforced op-executor affinity (more important
  now that chain.isDone is externally observable)
- per-op tracking for graceful shutdown of pending Scheduled retries
  (explicitly chose cancel-on-close; this is the path if requirements
  change)
- SessionImpl.checkHeartbeat re-arms a 100ms wheel tick unconditionally;
  for idle sessions this is 10 wakeups/sec/session of noise

Each entry has file pointer, symptom, fix sketch, and risk.
@igorbernstein2 igorbernstein2 force-pushed the threading-refactor-v2 branch from 5e619a4 to fe812c6 Compare June 17, 2026 20:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant